{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "635d8ebb",
   "metadata": {},
   "source": [
    "# WithListeners\n",
    "\n",
    "- Author: [Donghak Lee](https://github.com/stsr1284)\n",
    "- Peer Review:\n",
    "- Proofread : [Q0211](https://github.com/Q0211)\n",
    "- This is a part of [LangChain Open Tutorial](https://github.com/LangChain-OpenTutorial/LangChain-OpenTutorial)\n",
    "\n",
    "[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/LangChain-OpenTutorial/LangChain-OpenTutorial/blob/main/13-LangChain-Expression-Language/13-WithListeners.ipynb)[![Open in GitHub](https://img.shields.io/badge/Open%20in%20GitHub-181717?style=flat-square&logo=github&logoColor=white)](https://github.com/LangChain-OpenTutorial/LangChain-OpenTutorial/blob/main/13-LangChain-Expression-Language/13-WithListeners.ipynb)\n",
    "## Overview\n",
    "\n",
    "This tutorial covers the implementation and usage of ```with_listeners()``` in ```Runnable```.\n",
    "\n",
    "```with_listeners()``` binds lifecycle listeners to a Runnable, returning a new Runnable. This allows you to connect event listeners to the data flow, enabling tracking, analysis, and debugging during execution.\n",
    "\n",
    "The ```with_listeners()``` function provides the ability to add event listeners to a Runnable object. Listeners are functions that are called when specific events occur, such as start, end, or error.\n",
    "\n",
    "This function is useful in the following scenarios:\n",
    "\n",
    "- Logging the start and end of data processing\n",
    "\n",
    "- Triggering notifications on errors\n",
    "\n",
    "- Printing debugging information\n",
    "\n",
    "### Table of Contents\n",
    "\n",
    "- [Overview](#overview)\n",
    "- [Environment Setup](#environment-setup)\n",
    "- [with_listeners](#with_listeners)\n",
    "- [with_alisteners](#with_alisteners)\n",
    "- [RootListenersTracer](#rootlistenerstracer)\n",
    "\n",
    "### References\n",
    "\n",
    "- [LangChain with_listeners](https://python.langchain.com/v0.2/api_reference/core/runnables/langchain_core.runnables.base.Runnable.html#langchain_core.runnables.base.Runnable.with_listeners)\n",
    "- [LangChain RootListenersTracer](https://python.langchain.com/v0.2/api_reference/core/tracers/langchain_core.tracers.root_listeners.RootListenersTracer.html)\n",
    "----"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "c6c7aba4",
   "metadata": {},
   "source": [
    "## Environment Setup\n",
    "\n",
    "Set up the environment. You may refer to [Environment Setup](https://wikidocs.net/257836) for more details.\n",
    "\n",
    "**[Note]**\n",
    "- ```langchain-opentutorial``` is a package that provides a set of easy-to-use environment setup, useful functions and utilities for tutorials. \n",
    "- You can check out the [```langchain-opentutorial```](https://github.com/LangChain-OpenTutorial/langchain-opentutorial-pypi) for more details."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "21943adb",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%capture --no-stderr\n",
    "%pip install langchain-opentutorial"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "f25ec196",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Install required packages\n",
    "from langchain_opentutorial import package\n",
    "\n",
    "package.install(\n",
    "    [\n",
    "        \"langchain_core\",\n",
    "        \"langchain_openai\",\n",
    "        \"datetime\",\n",
    "    ],\n",
    "    verbose=False,\n",
    "    upgrade=False,\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "id": "7f9065ea",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Environment variables have been set successfully.\n"
     ]
    }
   ],
   "source": [
    "# Set environment variables\n",
    "from langchain_opentutorial import set_env\n",
    "\n",
    "set_env(\n",
    "    {\n",
    "        \"OPENAI_API_KEY\": \"\",\n",
    "        \"LANGCHAIN_API_KEY\": \"\",\n",
    "        \"LANGCHAIN_TRACING_V2\": \"true\",\n",
    "        \"LANGCHAIN_ENDPOINT\": \"https://api.smith.langchain.com\",\n",
    "        \"LANGCHAIN_PROJECT\": \"WithListeners\",\n",
    "    }\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "690a9ae0",
   "metadata": {},
   "source": [
    "You can alternatively set API keys such as ```OPENAI_API_KEY``` in a ```.env``` file and load them.\n",
    "\n",
    "[Note] This is not necessary if you've already set the required API keys in previous steps."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "4f99b5b6",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Load API keys from .env file\n",
    "from dotenv import load_dotenv\n",
    "\n",
    "load_dotenv(override=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "2b2fc536",
   "metadata": {},
   "source": [
    "## with_listeners\n",
    "\n",
    "```with_listeners()``` takes a list of listener functions and returns a new ```Runnable``` object. Listener functions respond to start, end, and error events.\n",
    "\n",
    "Using event listeners allows you to observe the data flow, and you can flexibly register them using ```with_listeners()```.\n",
    "\n",
    "Here is an example implementation of the function."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "id": "fed852d1",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Start: {'input': 'Hello, World!'}\n",
      "End: {'output': 'Step 1 completed with message Hello, World!'}\n",
      "Start: {'input': 'Step 1 completed with message Hello, World!'}\n",
      "End: {'output': 'Step 2 completed with message Step 1 completed with message Hello, World!'}\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "'Step 2 completed with message Step 1 completed with message Hello, World!'"
      ]
     },
     "execution_count": 11,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from langchain_core.runnables import RunnableLambda\n",
    "import time\n",
    "\n",
    "\n",
    "# Define tasks for each Runnable\n",
    "def stepOne(message):\n",
    "    time.sleep(1)  # Wait for 1 second\n",
    "    return f\"Step 1 completed with message {message}\"\n",
    "\n",
    "\n",
    "def stepTwo(message):\n",
    "    time.sleep(2)  # Wait for 2 seconds\n",
    "    return f\"Step 2 completed with message {message}\"\n",
    "\n",
    "\n",
    "# Define listener functions\n",
    "def fnStart(runObj):\n",
    "    print(f\"Start: {runObj.inputs}\")\n",
    "\n",
    "\n",
    "def fnEnd(runObj):\n",
    "    print(f\"End: {runObj.outputs}\")\n",
    "\n",
    "\n",
    "def fnError(runObj):\n",
    "    print(f\"Error: {runObj.error}\")\n",
    "\n",
    "\n",
    "# Define each Runnable\n",
    "runnable1 = RunnableLambda(stepOne).with_listeners(\n",
    "    on_start=fnStart, on_end=fnEnd, on_error=fnError\n",
    ")\n",
    "\n",
    "runnable2 = RunnableLambda(stepTwo).with_listeners(\n",
    "    on_start=fnStart, on_end=fnEnd, on_error=fnError\n",
    ")\n",
    "\n",
    "# Chain connection\n",
    "chain = runnable1 | runnable2\n",
    "\n",
    "# Execute\n",
    "chain.invoke(\"Hello, World!\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "9908cc92",
   "metadata": {},
   "source": [
    "You can also register events in the chain of LCEL using ```with_listeners()```."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "id": "cc234be1",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Chain Start: {'input': 'Hello, World!'}\n",
      "Start: {'input': 'Hello, World!'}\n",
      "End: {'output': 'Step 1 completed with message Hello, World!'}\n",
      "Start: {'input': 'Step 1 completed with message Hello, World!'}\n",
      "End: {'output': 'Step 2 completed with message Step 1 completed with message Hello, World!'}\n",
      "Chain End: {'output': 'Step 2 completed with message Step 1 completed with message Hello, World!'}\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "'Step 2 completed with message Step 1 completed with message Hello, World!'"
      ]
     },
     "execution_count": 12,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "def chainStart(runObj):\n",
    "    print(f\"Chain Start: {runObj.inputs}\")\n",
    "\n",
    "\n",
    "def chainEnd(runObj):\n",
    "    print(f\"Chain End: {runObj.outputs}\")\n",
    "\n",
    "\n",
    "chain_with_listeners = chain.with_listeners(\n",
    "    on_start=chainStart, on_end=chainEnd, on_error=fnError\n",
    ")\n",
    "\n",
    "chain_with_listeners.invoke(\"Hello, World!\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "new-markdown-cell",
   "metadata": {},
   "source": [
    "## with_alisteners\n",
    "\n",
    "Bind asynchronous lifecycle listeners to a Runnable, returning a new Runnable.\n",
    "\n",
    "on_start: Asynchronously called before the Runnable starts running.\n",
    "on_end: Asynchronously called after the Runnable finishes running.\n",
    "on_error: Asynchronously called if the Runnable throws an error.\n",
    "\n",
    "The Run object contains information about the run, including its id, type, input, output, error, start_time, end_time, and any tags or metadata added to the run."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "id": "6a5a85f8",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "runnable2: 25\n",
      "runnable3: 25\n",
      "runnable2: 28\n",
      "runnable3: 28\n",
      "Runnable[2s]: starts at 28\n",
      "Runnable[3s]: starts at 28\n",
      "Runnable[2s]: ends at 30\n",
      "runnable2: 30\n",
      "Runnable[3s]: ends at 31\n",
      "runnable3: 31\n",
      "runnable2: 32\n",
      "runnable3: 33\n"
     ]
    }
   ],
   "source": [
    "import asyncio\n",
    "\n",
    "\n",
    "async def testRunnable(time_to_sleep: int):\n",
    "    print(f\"Runnable[{time_to_sleep}s]: starts at {time.strftime('%S')}\")\n",
    "    await asyncio.sleep(time_to_sleep)\n",
    "    print(f\"Runnable[{time_to_sleep}s]: ends at {time.strftime('%S')}\")\n",
    "\n",
    "\n",
    "async def fnStart(runObj):\n",
    "    print(f\"runnable{runObj.inputs['input']}: {time.strftime('%S')}\")\n",
    "    await asyncio.sleep(3)\n",
    "    print(f\"runnable{runObj.inputs['input']}: {time.strftime('%S')}\")\n",
    "\n",
    "\n",
    "async def fnEnd(runObj):\n",
    "    print(f\"runnable{runObj.inputs['input']}: {time.strftime('%S')}\")\n",
    "    await asyncio.sleep(2)\n",
    "    print(f\"runnable{runObj.inputs['input']}: {time.strftime('%S')}\")\n",
    "\n",
    "\n",
    "runnable = RunnableLambda(testRunnable).with_alisteners(on_start=fnStart, on_end=fnEnd)\n",
    "\n",
    "\n",
    "async def concurrentRuns():\n",
    "    await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))\n",
    "\n",
    "\n",
    "await concurrentRuns()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "b7f88599",
   "metadata": {},
   "source": [
    "## RootListenersTracer\n",
    "\n",
    "You can directly bind ```RootListenersTracer``` to a Runnable using ```RunnableBinding``` to register event listeners. This is the internal code of ```with_listeners()```.\n",
    "\n",
    "```RootListenersTracer``` calls listeners on run start, end, and error."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "id": "67194be7",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Start: {'messages': [[{'lc': 1, 'type': 'constructor', 'id': ['langchain', 'schema', 'messages', 'HumanMessage'], 'kwargs': {'content': 'Tell me the founding year of Google', 'type': 'human'}}]]}\n",
      "End: {'generations': [[{'text': 'Google was founded in the year 1998.', 'generation_info': {'finish_reason': 'stop', 'logprobs': None}, 'type': 'ChatGeneration', 'message': {'lc': 1, 'type': 'constructor', 'id': ['langchain', 'schema', 'messages', 'AIMessage'], 'kwargs': {'content': 'Google was founded in the year 1998.', 'additional_kwargs': {'refusal': None}, 'response_metadata': {'token_usage': {'completion_tokens': 11, 'prompt_tokens': 14, 'total_tokens': 25, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_72ed7ab54c', 'finish_reason': 'stop', 'logprobs': None}, 'type': 'ai', 'id': 'run-6f335bec-171d-47a8-a508-85bb52307e10-0', 'usage_metadata': {'input_tokens': 14, 'output_tokens': 11, 'total_tokens': 25, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}}, 'tool_calls': [], 'invalid_tool_calls': []}}}]], 'llm_output': {'token_usage': {'completion_tokens': 11, 'prompt_tokens': 14, 'total_tokens': 25, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_72ed7ab54c'}, 'run': None, 'type': 'LLMResult'}\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "AIMessage(content='Google was founded in the year 1998.', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 11, 'prompt_tokens': 14, 'total_tokens': 25, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_72ed7ab54c', 'finish_reason': 'stop', 'logprobs': None}, id='run-6f335bec-171d-47a8-a508-85bb52307e10-0', usage_metadata={'input_tokens': 14, 'output_tokens': 11, 'total_tokens': 25, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}})"
      ]
     },
     "execution_count": 11,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from langchain_core.tracers.root_listeners import RootListenersTracer\n",
    "from langchain_core.runnables.base import RunnableBinding\n",
    "from langchain_openai import ChatOpenAI\n",
    "\n",
    "\n",
    "# Define listener functions\n",
    "def fnStart(runObj):\n",
    "    print(f\"Start: {runObj.inputs}\")\n",
    "\n",
    "\n",
    "def fnEnd(runObj):\n",
    "    print(f\"End: {runObj.outputs}\")\n",
    "\n",
    "\n",
    "def fnError(runObj):\n",
    "    print(f\"End: {runObj.error}\")\n",
    "\n",
    "\n",
    "# # LLM and chain setup\n",
    "model = ChatOpenAI(model=\"gpt-4o-mini\", temperature=0.0)\n",
    "\n",
    "model_with_listeners = RunnableBinding(\n",
    "    bound=model,\n",
    "    config_factories=[\n",
    "        lambda config: {\n",
    "            \"callbacks\": [\n",
    "                RootListenersTracer(\n",
    "                    config=config,\n",
    "                    on_start=fnStart,\n",
    "                    on_end=fnEnd,\n",
    "                    on_error=fnError,\n",
    "                )\n",
    "            ],\n",
    "        }\n",
    "    ],\n",
    ")\n",
    "\n",
    "model_with_listeners.invoke(\"Tell me the founding year of Google\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "3b89e2b1",
   "metadata": {},
   "source": [
    "### Using tracers without LCEL\n",
    "You can directly use ```on_llm_start()``` and ```on_llm_end()``` of ```RootListenersTracer``` to handle events."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "id": "31ce2ca0",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[START] Run ID: a76a54b6-8173-4173-b063-ebe107e52dd3, Start time: 2025-01-12 05:32:32.311749+00:00\n",
      "Input: {'prompts': ['What is the founding year of Google?']}\n",
      "[END] Run ID: a76a54b6-8173-4173-b063-ebe107e52dd3, End time: 2025-01-12 05:32:32.898851+00:00\n",
      "Output: {'generations': [[{'text': 'Google was founded on September 4, 1998.', 'generation_info': {'finish_reason': 'stop', 'logprobs': None}, 'type': 'ChatGeneration', 'message': {'lc': 1, 'type': 'constructor', 'id': ['langchain', 'schema', 'messages', 'AIMessage'], 'kwargs': {'content': 'Google was founded on September 4, 1998.', 'additional_kwargs': {'refusal': None}, 'response_metadata': {'token_usage': {'completion_tokens': 13, 'prompt_tokens': 15, 'total_tokens': 28, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-3.5-turbo-0125', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, 'type': 'ai', 'id': 'run-d0c3617b-05c1-4e34-8fa5-eba2ed0f2748-0', 'usage_metadata': {'input_tokens': 15, 'output_tokens': 13, 'total_tokens': 28, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}}, 'tool_calls': [], 'invalid_tool_calls': []}}}]], 'llm_output': {'token_usage': {'completion_tokens': 13, 'prompt_tokens': 15, 'total_tokens': 28, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-3.5-turbo'}, 'run': [{'run_id': UUID('d0c3617b-05c1-4e34-8fa5-eba2ed0f2748')}], 'type': 'LLMResult'}\n"
     ]
    }
   ],
   "source": [
    "from langchain_core.tracers.schemas import Run\n",
    "import uuid\n",
    "from datetime import datetime, timezone\n",
    "\n",
    "\n",
    "# Modify user-defined listener functions\n",
    "def onStart(run: Run):\n",
    "    print(\n",
    "        f\"[START] Run ID: {run.id}, Start time: {run.start_time}\\nInput: {run.inputs}\"\n",
    "    )\n",
    "\n",
    "\n",
    "def onEnd(run: Run):\n",
    "    # Safely handle output\n",
    "    print(f\"[END] Run ID: {run.id}, End time: {run.end_time}\\nOutput: {run.outputs}\")\n",
    "\n",
    "\n",
    "def onError(run: Run):\n",
    "    print(f\"[ERROR] Run ID: {run.id}, Error message: {run.error}\")\n",
    "\n",
    "\n",
    "# Create RootListenersTracer\n",
    "tracer = RootListenersTracer(\n",
    "    config={}, on_start=onStart, on_end=onEnd, on_error=onError\n",
    ")\n",
    "\n",
    "# Set up LLM\n",
    "llm = ChatOpenAI()\n",
    "\n",
    "# Input text\n",
    "input_text = \"What is the founding year of Google?\"\n",
    "\n",
    "try:\n",
    "    # Create and initialize Run object at the start of execution\n",
    "    run_id = str(uuid.uuid4())\n",
    "    start_time = datetime.now(timezone.utc)\n",
    "\n",
    "    # Create Run object (including only required fields)\n",
    "    run = Run(\n",
    "        id=run_id,\n",
    "        start_time=start_time,\n",
    "        execution_order=1,\n",
    "        serialized={},\n",
    "        inputs={\"input\": input_text},\n",
    "        run_type=\"llm\",\n",
    "    )\n",
    "\n",
    "    # Call tracer at the start of execution\n",
    "    tracer.on_llm_start(serialized={}, prompts=[input_text], run_id=run_id)\n",
    "\n",
    "    # Execute the actual Runnable\n",
    "    result = llm.generate([input_text])\n",
    "\n",
    "    # Update Run object\n",
    "    run.end_time = datetime.now(timezone.utc)\n",
    "    run.outputs = result\n",
    "\n",
    "    # Call tracer at the end of execution\n",
    "    tracer.on_llm_end(response=result, run_id=run_id)\n",
    "\n",
    "except Exception as e:\n",
    "    run.error = str(e)\n",
    "    run.end_time = datetime.now(timezone.utc)\n",
    "    tracer.on_llm_error(error=e, run_id=run_id)\n",
    "    print(f\"Error occurred: {str(e)}\")"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "langchain-opentutorial-k6AU65mE-py3.11",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.11.11"
  },
  "version": "3.11.11"
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
